Airflow Branchingのコード例を読み解く
code:py
@task.branch(task_id="branch_task") # ★2 もちろんデコレータでbranchですよーって指定はする
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
# ★2 次に実行するタスクの名前を返すようにする
return "continue_task"
elif xcom_value >= 3:
return "stop_task"
else:
return None
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
xcom_push=True,
dag=dag,
)
branch_op = branch_func()
continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)
# ★1 branch_op が次どっちを実行するか
# それは branch_op の中身次第
xcom_valueの値がどこで定義してどう増えてる?わかんねーけどsta.icon
tiって何?
---
デフォルトではreturn_valueというキーが使われるってのはわかった
echo 5だから5をセットしてるってこと?
どういうこと?
標準出力をセットするってことなの?
do_xcom_push (bool) – if True, an XCom is pushed containing the Operator’s result
なるほど、Trueにしたら「そのオペレーターの結果を自動でセットしますねん」か
この場合、echo 5の結果だから、5を入れてるね
xcom_push=True,
これはたぶんバージョン違いだと思う
古いairflowはxcom_push引数だったが、これだと紛らわしいので、今はdo_xcom_pushになってるんじゃねえかなと予想sta.icon
で、tiって何や?
Task Instance?
自動で入ってくるやつ?